home *** CD-ROM | disk | FTP | other *** search
- #!/usr/bin/env python
-
- ## system-config-printer
-
- ## Copyright (C) 2008 Red Hat, Inc.
- ## Copyright (C) 2008 Till Kamppeter <till.kamppeter@gmail.com>
-
- ## This program is free software; you can redistribute it and/or modify
- ## it under the terms of the GNU General Public License as published by
- ## the Free Software Foundation; either version 2 of the License, or
- ## (at your option) any later version.
-
- ## This program is distributed in the hope that it will be useful,
- ## but WITHOUT ANY WARRANTY; without even the implied warranty of
- ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- ## GNU General Public License for more details.
-
- ## You should have received a copy of the GNU General Public License
- ## along with this program; if not, write to the Free Software
- ## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
- import urllib, httplib, platform, threading, tempfile, traceback
- import os, sys
- from xml.etree.ElementTree import XML
- from . import Device
-
- __all__ = ['OpenPrinting']
-
- def _normalize_space (text):
- result = text.strip ()
- result = result.replace ('\n', ' ')
- i = result.find (' ')
- while i != -1:
- result = result.replace (' ', ' ')
- i = result.find (' ')
- return result
-
- class _QueryThread (threading.Thread):
- def __init__ (self, parent, parameters, callback, user_data=None):
- threading.Thread.__init__ (self)
- self.parent = parent
- self.parameters = parameters
- self.callback = callback
- self.user_data = user_data
-
- self.setDaemon (True)
-
- def run (self):
- # CGI script to be executed
- query_command = "/query.cgi"
- # Headers for the post request
- headers = {"Content-type": "application/x-www-form-urlencoded",
- "Accept": "text/plain"}
- params = ("%s&uilanguage=%s&locale=%s" %
- (urllib.urlencode (self.parameters),
- self.parent.language[0],
- self.parent.language[0]))
- self.url = "http://%s%s?%s" % (self.parent.base_url, query_command, params)
- # Send request
- result = None
- status = 1
- try:
- conn = httplib.HTTPConnection(self.parent.base_url)
- conn.request("POST", query_command, params, headers)
- resp = conn.getresponse()
- status = resp.status
- if status == 200:
- result = resp.read()
- conn.close()
- except:
- result = sys.exc_info ()
-
- if status == 200:
- status = 0
-
- if self.callback != None:
- self.callback (status, self.user_data, result)
-
- class OpenPrinting:
- def __init__(self, language=None):
- """
- @param language: language, as given by the first element of
- locale.setlocale().
- @type language: string
- """
- if language == None:
- import locale
- try:
- language = locale.getlocale(locale.LC_MESSAGES)
- except locale.Error, e:
- language = 'C'
- self.language = language
-
- # XXX Read configuration file.
- self.base_url = "www.openprinting.org"
-
- # Restrictions on driver choices XXX Parameters to be taken from
- # config file
- self.onlyfree = 0
- self.onlymanufacturer = 0
-
- def cancelOperation(self, handle):
- """
- Cancel an operation.
-
- @param handle: query/operation handle
- """
- # Just prevent the callback.
- try:
- handle.callback = None
- except:
- pass
-
- def webQuery(self, parameters, callback, user_data=None):
- """
- Run a web query for a driver.
-
- @type parameters: dict
- @param parameters: URL parameters
- @type callback: function
- @param callback: callback function, taking (integer, user_data, string)
- parameters with the first parameter being the status code, zero for
- success
- @return: query handle
- """
- the_thread = _QueryThread (self, parameters, callback, user_data)
- the_thread.start()
- return the_thread
-
- def searchPrinters(self, searchterm, callback, user_data=None):
- """
- Search for printers using a search term.
-
- @type searchterm: string
- @param searchterm: search term
- @type callback: function
- @param callback: callback function, taking (integer, user_data, string)
- parameters with the first parameter being the status code, zero for
- success
- @return: query handle
- """
-
- def parse_result (status, data, result):
- (callback, user_data) = data
- if status != 0:
- callback (status, user_data, result)
- return
-
- status = 0
- printers = {}
- try:
- root = XML (result)
- # We store the printers as a dict of:
- # foomatic_id: displayname
-
- for printer in root.findall ("printer"):
- id = printer.find ("id")
- make = printer.find ("make")
- model = printer.find ("model")
- if id != None and make != None and model != None:
- idtxt = id.text
- maketxt = make.text
- modeltxt = model.text
- if idtxt and maketxt and modeltxt:
- printers[idtxt] = maketxt + " " + modeltxt
- except:
- status = 1
- printers = sys.exc_info ()
-
- try:
- callback (status, user_data, printers)
- except:
- (type, value, tb) = sys.exc_info ()
- tblast = traceback.extract_tb (tb, limit=None)
- if len (tblast):
- tblast = tblast[:len (tblast) - 1]
- extxt = traceback.format_exception_only (type, value)
- for line in traceback.format_tb(tb):
- print (line.strip ())
- print (extxt[0].strip ())
-
- # Common parameters for the request
- params = { 'type': 'printers',
- 'printer': searchterm,
- 'format': 'xml' }
- return self.webQuery(params, parse_result, (callback, user_data))
-
- def listDrivers(self, model, callback, user_data=None, extra_options=None):
- """
- Obtain a list of printer drivers.
-
- @type model: string or cupshelpers.Device
- @param model: foomatic printer model string or a cupshelpers.Device
- object
- @type callback: function
- @param callback: callback function, taking (integer, user_data, string)
- parameters with the first parameter being the status code, zero for
- success
- @type extra_options: string -> string dictionary
- @param extra_options: Additional search options, see
- http://www.linuxfoundation.org/en/OpenPrinting/Database/Query
- @return: query handle
- """
-
- def parse_result (status, data, result):
- (callback, user_data) = data
- if status != 0:
- callback (status, user_data, result)
-
- try:
- root = XML (result)
- drivers = {}
- # We store the drivers as a dict of:
- # foomatic_id:
- # { 'name': name,
- # 'url': url,
- # 'supplier': supplier,
- # 'license': short license string e.g. GPLv2,
- # 'licensetext': license text (Plain text),
- # 'nonfreesoftware': Boolean,
- # 'thirdpartysupplied': Boolean,
- # 'manufacturersupplied': Boolean,
- # 'patents': Boolean,
- # 'supportcontacts' (optional):
- # list of { 'name',
- # 'url',
- # 'level',
- # }
- # 'shortdescription': short description,
- # 'recommended': Boolean,
- # 'functionality':
- # { 'text': integer percentage,
- # 'lineart': integer percentage,
- # 'graphics': integer percentage,
- # 'photo': integer percentage,
- # 'speed': integer percentage,
- # }
- # 'packages' (optional):
- # { arch:
- # { file:
- # { 'url': url,
- # 'realversion': upstream version string,
- # 'version': packaged version string,
- # 'release': package release string
- # }
- # }
- # }
- # 'ppds' (optional):
- # URL string list
- # }
- # There is more information in the raw XML, but this
- # can be added to the Python structure as needed.
-
- for driver in root.findall ('driver'):
- id = driver.attrib.get ('id')
- if id == None:
- continue
-
- dict = {}
- for attribute in ['name', 'url', 'supplier', 'license',
- 'shortdescription' ]:
- element = driver.find (attribute)
- if element != None and element.text != None:
- dict[attribute] = _normalize_space (element.text)
-
- element = driver.find ('licensetext')
- if element != None:
- dict['licensetext'] = element.text
-
- for boolean in ['nonfreesoftware', 'recommended',
- 'patents', 'thirdpartysupplied',
- 'manufacturersupplied']:
- dict[boolean] = driver.find (boolean) != None
-
- # Make a 'freesoftware' tag for compatibility with
- # how the OpenPrinting API used to work (see trac
- # #74).
- dict['freesoftware'] = not dict['nonfreesoftware']
-
- supportcontacts = []
- container = driver.find ('supportcontacts')
- if container != None:
- for sc in container.findall ('supportcontact'):
- supportcontact = {}
- if sc.text != None:
- supportcontact['name'] = \
- _normalize_space (sc.text)
- else:
- supportcontact['name'] = ""
- supportcontact['url'] = sc.attrib.get ('url')
- supportcontact['level'] = sc.attrib.get ('level')
- supportcontacts.append (supportcontact)
-
- if supportcontacts:
- dict['supportcontacts'] = supportcontacts
-
- if not dict.has_key ('name') or not dict.has_key ('url'):
- continue
-
- container = driver.find ('functionality')
- if container != None:
- functionality = {}
- for attribute in ['text', 'lineart', 'graphics',
- 'photo', 'speed']:
- element = container.find (attribute)
- if element != None:
- functionality[attribute] = element.text
- if functionality:
- dict[container.tag] = functionality
-
- packages = {}
- container = driver.find ('packages')
- if container != None:
- for arch in container.getchildren ():
- rpms = {}
- for package in arch.findall ('package'):
- rpm = {}
- for attribute in ['realversion','version',
- 'release', 'url', 'pkgsys']:
- element = package.find (attribute)
- if element != None:
- rpm[attribute] = element.text
-
- repositories = package.find ('repositories')
- if repositories != None:
- for pkgsys in repositories.getchildren ():
- rpm.setdefault('repositories', {})[pkgsys.tag] = pkgsys.text
-
- rpms[package.attrib['file']] = rpm
- packages[arch.tag] = rpms
-
- if packages:
- dict['packages'] = packages
-
- ppds = []
- container = driver.find ('ppds')
- if container != None:
- for each in container.getchildren ():
- ppds.append (each.text)
-
- if ppds:
- dict['ppds'] = ppds
-
- drivers[id] = dict
- callback (0, user_data, drivers)
- except:
- callback (1, user_data, sys.exc_info ())
-
- if isinstance(model, Device):
- model = model.id
-
- params = { 'type': 'drivers',
- 'moreinfo': '1',
- 'showprinterid': '1',
- 'onlynewestdriverpackages': '1',
- 'architectures': platform.machine(),
- 'noobsoletes': '1',
- 'onlyfree': str (self.onlyfree),
- 'onlymanufacturer': str (self.onlymanufacturer),
- 'printer': model,
- 'format': 'xml'}
- if extra_options:
- params.update(extra_options)
- return self.webQuery(params, parse_result, (callback, user_data))
-
- def _simple_gui ():
- import gtk, pprint
- gtk.gdk.threads_init ()
- class QueryApp:
- def __init__(self):
- self.openprinting = OpenPrinting()
- self.main = gtk.Dialog ("OpenPrinting query application",
- None,
- gtk.DIALOG_MODAL | gtk.DIALOG_NO_SEPARATOR,
- (gtk.STOCK_CLOSE, gtk.RESPONSE_CLOSE,
- "Search", 10,
- "List", 20))
- self.main.set_border_width (6)
- self.main.vbox.set_spacing (2)
- vbox = gtk.VBox (False, 6)
- self.main.vbox.pack_start (vbox, True, True, 0)
- vbox.set_border_width (6)
- self.entry = gtk.Entry ()
- vbox.pack_start (self.entry, False, False, 6)
- sw = gtk.ScrolledWindow ()
- self.tv = gtk.TextView ()
- sw.add (self.tv)
- vbox.pack_start (sw, True, True, 6)
- self.main.connect ("response", self.response)
- self.main.show_all ()
-
- def response (self, dialog, response):
- if (response == gtk.RESPONSE_CLOSE or
- response == gtk.RESPONSE_DELETE_EVENT):
- gtk.main_quit ()
-
- if response == 10:
- # Run a query.
- self.openprinting.searchPrinters (self.entry.get_text (),
- self.search_printers_callback)
-
- if response == 20:
- self.openprinting.listDrivers (self.entry.get_text (),
- self.list_drivers_callback)
-
- def search_printers_callback (self, status, user_data, printers):
- if status != 0:
- raise printers[1]
-
- text = ""
- for printer in printers.values ():
- text += printer + "\n"
- gtk.gdk.threads_enter ()
- self.tv.get_buffer ().set_text (text)
- gtk.gdk.threads_leave ()
-
- def list_drivers_callback (self, status, user_data, drivers):
- if status != 0:
- raise drivers[1]
-
- text = pprint.pformat (drivers)
- gtk.gdk.threads_enter ()
- self.tv.get_buffer ().set_text (text)
- gtk.gdk.threads_leave ()
-
- def query_callback (self, status, user_data, result):
- gtk.gdk.threads_enter ()
- self.tv.get_buffer ().set_text (str (result))
- file ("result.xml", "w").write (str (result))
- gtk.gdk.threads_leave ()
-
- q = QueryApp()
- gtk.main ()
-